package wen

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, ALS, Rating}
import org.apache.spark.{SparkContext, SparkConf}

import scala.collection.immutable.HashMap

/**
  * Created by chenjianwen on 2016/2/29.
  * 这里使用的数据是spark自带的电影和用户数据，目录在SPARK_HOME/data/mllib/als/
  */
object MALSLeaning {
  def main(args: Array[String]) {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("ALS 练习")
    val sc = new SparkContext(conf)

    //该文件每一列的含义：UserID::MovieID::Rating::Timestamp
    //读入文件
    val ratings = sc.textFile("/usr/spark/spark-1.6.0-bin-hadoop2.6/data/mllib/als/sample_movielens_ratings.txt").map( line =>{
      line.split("::")match{
        case Array(u,m,r,t)=>{
          (t.toLong%10,Rating(u.toInt,m.toInt,r.toDouble))
        }
      }
      }
    )

    //读入电影文件，用来做推荐之后匹配其中的电影id，得到电影名字。文件每一行的格式：电影id::电影名字::电影类型
    val moviesArray = sc.textFile("/usr/spark/spark-1.6.0-bin-hadoop2.6/data/mllib/als/sample_movielens_movies.txt").map(line =>{
      line.split("::")match{
        case Array(id,name,mtype)=>{
         (id.toLong,(name,mtype))
        }
      }
    }).collect()
    //改造成一个Map,其实Array(id,xxx).toMap 也可以实现以下功能的。
    var moviesMap:Map[Long,(String, String)] = Map()
    moviesArray.foreach(x=>{
      moviesMap += (x._1->(x._2._1,x._2._2))
    })


    println(s"-------------------原始数据长度:${ratings.count()}--------------------")


    val rank = 5
    val numIterator = 5
    //ALS算法的正则化参数，为了防止训练模型时拟合过度。
    val lambda = 0.1

    /*
      ##############################
      ##把数据切分为几部分
      ##############################
     */
    //训练模型使用的数据
    val beTrain = ratings.filter(x =>x._1<4L).map(_._2)

    //用于对比预测之后的真实数据
    val test = ratings.filter(x =>x._1>=8L).map(_._2)
    //用来做预测的数据
    val prediction = beTrain.map(one => (one.user,one.product))


    /*
      ##############################
      ##  训练模型
      ##############################
*/
    println("------------------------开始训练模型-----------------------")
    val train = ALS.train(beTrain,rank,numIterator,lambda)
    //保存模型
    train.save(sc,"train")
    //加载模型方法： MatrixFactorizationModel.load(sc,"train")


    /*
    ##############################
    ##预测数据，和真实数据对比，求出误差(真实值和实际值的方差)大小
    ##############################
 */

    //利用模型进行预测，并且把结果转为((userId,productId),分数)
    val predictionResult = train.predict(prediction).map(x=>((x.user,x.product),x.rating))

    //把预测出来的评分结果和原有的真实评分数据进行join操作，便于对比准确性，结果为((userId,productId),(预测数据，真实数据))
    val validation = predictionResult.join(beTrain.map(x=>((x.user,x.product),x.rating)))


    //输出结果
   /* val all = validation.collect()
    println(s"------------------长度：${all.length}-------------------")
    all.foreach(x=>{
      println(x)
    })*/



    /*
        ##############################
        ##为某个用户推荐产品
        ##############################
     */
    //为id为29的用户推荐5部电影
    val recommendRating = train.recommendProducts(29,5)
    //得到Array[(电影名字,电影类型,评分)]
    val res = recommendRating.map(x =>{
      (moviesMap(x.product)._1,moviesMap(x.product)._2,x.rating)
    })
    println("为用户29 推荐的电影如下：")
    res.foreach(x=>{
      println(s"电影名字：${x._1}，电影类型：${x._2}，评分:${x._3}")
    })


    println("产品8 的特征是向量：")
    val proFeatures = train.productFeatures.lookup(8)
    proFeatures(0).foreach(println(_))


  }
}
